package hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteColumnFamilyState;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.htrace.fasterxml.jackson.databind.Module.SetupContext;
//import org.junit.Before;
//import org.junit.Test;

import com.sun.java_cup.internal.runtime.Symbol;


public class SearchStudent {
    //与HBase数据库的连接对象
    public static Connection connection;

    //数据库元数操作对象
    public static Admin admin;


    public static void main(String[] args) {
        try {
            System.out.println("开始执行本程序");
            System.out.println("HBaseDemo.main()->admin1:" + admin);
            setUp();

            //这里调用下面的方法来操作hbase
            createTable("helloWorld");


        } catch (Exception e) {
            e.printStackTrace();
        }

    }


    public static void setUp() throws Exception {
        //取得一个数据库配置参数对象
        Configuration conf =HBaseConfiguration.create();


        //设置连接参数:hbase数据库所在的主机IP
        conf.set("hbase.zookeeper.quorum", "198.168.43.11");
        //设置连接参数:hbase数据库使用的接口
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        //取得一个数据库连接对象
        connection=ConnectionFactory.createConnection(conf);

        //取得一个数据库元数据操作对象
        admin=connection.getAdmin();
        System.out.println("HBaseDemo.setUp()->admin:" + admin);
    }



    /**
     * 创建表的方法，输入参数为表名

     */
    public static void createTable(String tableNameString) throws IOException {
        System.out.println("-------------------创建表开始了哦------------------------");


        //新建一个数据表表名对象
        TableName tableName=TableName.valueOf(tableNameString);
        System.out.println("HBaseDemo.createTable()->tabelName:" + tableName);
        //if新建的表存在
        if(admin.tableExists(tableName)) {
            System.out.println("表已经存在！");
        }

        //if需要新建的表不存在
        else {

            //数据表描述对象
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            System.out.println("HBaseDemo.createTable()->hTableDescriptor:" + hTableDescriptor);

            //数据簇描述对象
            HColumnDescriptor family = new HColumnDescriptor("base");
            System.out.println("HBaseDemo.createTable()->family:" + family);
            //在数据表中新建一个列簇
            hTableDescriptor.addFamily(family);

            //新建数据表
            admin.createTable(hTableDescriptor);
            System.out.println("HBaseDemo.createTable()->admin3:" + admin);
        }
        System.out.println("-----------------创建表结束 ---------------");
    }


    /**
     * 查询表中的数据
     */
    public static void queryTable(String tableNameString) throws IOException {
        System.out.println("--------------------查询整表的数据--------");

        //获取数据表对象
        Table table = connection.getTable(TableName.valueOf(tableNameString));

        //获取表中的数据
        ResultScanner scanner = table.getScanner(new Scan());

        //循环输出表中的数据
        for (Result result : scanner) {

            byte[] row = result.getRow();
            System.out.println("row key is:"+new String(row));

            List<Cell> listCells = result.listCells();
            for (Cell cell : listCells) {

                byte[] familyArray = cell.getFamilyArray();
                byte[] qualifierArray = cell.getQualifierArray();
                byte[] valueArray = cell.getValueArray();

                System.out.println("row value is:"+ new String(familyArray) +
                        new String(qualifierArray) + new String(valueArray));
            }
        }
        System.out.println("---------------查询整表数据结束----------");
    }


    public static void queryTableByRowKey(String tableNameString,String rowNameString) throws IOException {
        System.out.println("---------------按行建查询表中数据--------");

        //取得数据表对象
        Table table=connection.getTable(TableName.valueOf(tableNameString));

        //新建一个查询对象作为查询条件
        Get get = new Get(rowNameString.getBytes());

        //按行查询数据
        Result result = table.get(get);

        byte[] row = result.getRow();
        System.out.println("row key is:" +new String(row));

        List<Cell> listCells = result.listCells();
        for (Cell cell : listCells) {
            byte[] familyArray = cell.getFamilyArray();
            byte[] qualifierArray = cell.getQualifierArray();
            byte[] valueArray = cell.getValueArray();

            System.out.println("row value is:"+ new String(familyArray) +
                    new String(qualifierArray) + new String(valueArray));
        }
        System.out.println("---------------查行键数据结束----------");
    }


    public static void queryTableCondition(String tableNameString) throws IOException {

        System.out.println("------------------按条件查询---------------");

        //取得数据表对象
        Table table = connection.getTable(TableName.valueOf(tableNameString));

        //创建查询器

        Filter filter = new SingleColumnValueFilter(Bytes.toBytes("base"),
                Bytes.toBytes("name"), CompareOp.EQUAL, Bytes.toBytes("bookName"));

        //创建扫描器
        Scan scan = new Scan();

        //将查询过滤器的加入到数据表扫描器对象
        scan.setFilter(filter);

        //执行查询操作，并获取查询结果

        ResultScanner scanner =table.getScanner(scan);

        //输出结果

        for (Result result : scanner) {
            byte[] row = result.getRow();
            System.out.println("row key is:" + new String(row));

            List<Cell> listCells = result.listCells();
            for (Cell cell : listCells) {

                byte[] familyArray = cell.getFamilyArray();
                byte[] qualifierArray = cell.getQualifierArray();
                byte[] valueArray = cell.getValueArray();

                System.out.println("row value is:" + new String(familyArray)
                        + new String(qualifierArray) + new String(valueArray));
            }

        }

        System.out.println("------------------------按条件查询结束--------------------");

    }


    /**
     * 这是清空表的函数，用以使表变得无效
     */
    public static void truncateTable(String tableNameString) throws IOException {

        System.out.println("-------------------------清空表开始------------------");

        //取得目标数据表的表明对象
        TableName tableName = TableName.valueOf(tableNameString);

        //设置表状态为无效
        admin.disableTable(tableName);
        //清空指定表的数据
        admin.truncateTable(tableName, true);

        System.out.println("-------------------------清空表结束-----------------");
    }

    /**
     * 删除指定的表，输入值为表名
     * @param tableNameString
     * @throws IOException
     */
    public static void deleteTable(String tableNameString) throws IOException {
        System.out.println("-----------------------删除表---------------");

        // 设置表的状态为无效
        admin.disableTable(TableName.valueOf(tableNameString));

        //删除指定的表
        admin.deleteTable(TableName.valueOf(tableNameString));

        System.out.println("-------------------------删除表-----------------------");

    }

    /**
     * rowkey 是第二层，一行有很多的数据
     *
     */
    public static void deleteByRowKey(String tableNameString, String rowKey) throws IOException {
        System.out.println("删除行开始");

        //获取待操作的数据表对象
        Table table =connection.getTable(TableName.valueOf(tableNameString));

        //创建删除条件对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        table.delete(delete);

        System.out.println("删除行结束");
    }

    /**
     * 新建一个列簇，第一个是表名，第二个是列簇名
     * @param tableNameString
     * @param columnFamily
     * @throws IOException
     */
    public static void addColumnFamily(String tableNameString, String columnFamily) throws IOException {
        System.out.println("新建列簇开始");

        //取得目标数据表的标明对象
        TableName tableName = TableName.valueOf(tableNameString);

        //创建列簇对象
        HColumnDescriptor columnDescriptor = new HColumnDescriptor(columnFamily);

        //将新建的加入到指定的数据表
        admin.addColumn(tableName, columnDescriptor);

        System.out.println("新建列簇结束");
    }

    /**
     * 删除列簇的函数，第一个是表名，第二个是列簇名
     * @param tableNameString
     * @param columnFamily
     * @throws IOException
     */
    public static void DeleteColumnFamily(String tableNameString, String columnFamily) throws IOException {
        System.out.println("删除列簇开始");

        //取得目标数据表的表明对象
        TableName tableName = TableName.valueOf(tableNameString);

        //删除指定数据表中的指定列簇

        admin.deleteColumn(tableName, columnFamily.getBytes());

        System.out.println("删除列簇成功");
    }

    /**
     * 是插入的一个函数，插入的是一个put的list。具体创建方法
     * List<Put> putList = new ArrayList<put>();
     * Put put;
     * for(int i = 0; i < 10; i++){
     *  put = new Put(Bytes.toBytes("row" + i));
     *  put.addColumn(Bytes.toBytes("Base")//列簇,Bytes.toBytes("name")//列名,Bytes.toBytes("bookName")//值);
     *  putList.ad(put);
     * }
     *
     * @param tableNameString
     * @param putList
     * @throws IOException
     */
    public static void insert(String tableNameString, List<Put> putList) throws IOException {
        System.out.println("开始执行插入操作");

        //取得一个数据表对象
        Table table = connection.getTable(TableName.valueOf(tableNameString));

        //将数据插入到数据库中
        table.put(putList);

        System.out.println("插入成功");

    }
}
